package com.hzzftech.watchdog.busi.core.executor.starter;

import com.hzzftech.watchdog.busi.config.KettleConfig;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.KettleLogStore;
import org.pentaho.di.repository.LongObjectId;
import org.pentaho.di.repository.ObjectId;
import org.pentaho.di.repository.Repository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.sql.Connection;
import java.util.Timer;
import java.util.TimerTask;


@Component
public class KettleEnvironmentStarter implements CommandLineRunner {
    public static Logger logger = LoggerFactory.getLogger(KettleEnvironmentStarter.class);

    public static KettleEnvironmentStarter instance = new KettleEnvironmentStarter();
    public KettleDatabaseRepositoryMeta repositoryMeta;
    public DatabaseMeta databaseMeta;
    public Repository repository;
    public ObjectId objectId = new LongObjectId(100);

    public final static long TIMER_DELAY = 60000;

    @Autowired
    private KettleConfig kettleConfig;

    @Override
    public void run(String... args) throws Exception {
        // 重要，初始化kettle环境
        KettleLogStore.init(5000, 720);
        KettleEnvironment.init();
        setApplicationContext();
    }

    private void setApplicationContext() {
        try {
            initDBMeta();
            repository = buildRepository();
            instance = this;
            timerTask();
        } catch (Exception e) {
            if (repository != null) {
                repository.disconnect();
            }
            logger.error(ExceptionUtils.getStackTrace(e));
        }
    }

    private void timerTask() {
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                KettleDatabaseRepository kettleDatabaseRepository = (KettleDatabaseRepository) getRepository();
                boolean rs = false;
                try {
                    // 数据资源库连接
                    Connection connection =kettleDatabaseRepository.connectionDelegate.getDatabase().getConnection();
                    rs = connection.isClosed();
                    if (rs) {
                        kettleDatabaseRepository.disconnect();
                        kettleDatabaseRepository.connectionDelegate.getDatabase().disconnect();
                        kettleDatabaseRepository.connectionDelegate.getDatabase().connect();
                    }
                } catch (Exception e) {
                    logger.error(ExceptionUtils.getStackTrace(e));
                }
            }
        }, TIMER_DELAY, 600000);
    }

    private void initDBMeta() {
        String name =kettleConfig.getRepoName();
        if (StringUtils.isEmpty(name)) {
            name = kettleConfig.getIp() + "_" + kettleConfig.getDbName();
        }
        // 设置数据资源库链接相关元数据
        databaseMeta = new DatabaseMeta(name, kettleConfig.getDbVersion(), "Native",
                kettleConfig.getIp(), kettleConfig.getDbName(), kettleConfig.getPort(),
                kettleConfig.getUserName(), kettleConfig.getPassword());

        databaseMeta.setObjectId(objectId);
        databaseMeta.setShared(true);
        databaseMeta.addExtraOption(databaseMeta.getPluginId(), "characterEncoding", "UTF8" );
        databaseMeta.addExtraOption(databaseMeta.getPluginId(), "useUnicode", "true" );
        databaseMeta.addExtraOption(databaseMeta.getPluginId(), "autoReconnect", "true" );
        databaseMeta.addExtraOption(databaseMeta.getPluginId(), "autoReconnectForTools", "true" );
        databaseMeta.setVariable("EXTRA_OPTION_MYSQL.useSSL", "false");
        databaseMeta.setUsingConnectionPool(true);

        repositoryMeta = new KettleDatabaseRepositoryMeta();
        repositoryMeta.setName(name);
        repositoryMeta.setId("kt-watchdog-db-repo");
        repositoryMeta.setConnection(databaseMeta);
        repositoryMeta.setDescription(name);
    }

    /**
     * 构建数据资源库
     * @return
     * @throws KettleException
     */
    private Repository buildRepository() throws KettleException {
        KettleDatabaseRepository repository = new KettleDatabaseRepository();
        repository.init(repositoryMeta);
        repository.connect(kettleConfig.getRepoUserName(), kettleConfig.getRepoPassword());
        return repository;
    }

    public Repository getRepository() {
        if (repository == null) {
            try {
                repository = buildRepository();
            } catch (KettleException e) {
                logger.error(ExceptionUtils.getStackTrace(e));
            }
        }
        return repository;
    }
}
